package com.ml4ai.flink

import java.lang
import java.util.concurrent.ArrayBlockingQueue

import com.ml4ai.core.stack.mq.RabbitMQAgent
import com.ml4ai.flink.hadoopDFS.HadoopValueBatchSerializeOutputFormat
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector

object Scala {

  def main(arg: Array[String]): Unit = {
    val env = ExecutionEnvironment.getExecutionEnvironment
    val queue = new ArrayBlockingQueue[String](100)

    val rabbitMQ = RabbitMQAgent.builder().vHost("/app").host("master").port(5672).user("app").password("app").build
    rabbitMQ.declareQueue("action", true, false, false, null)
    rabbitMQ.produceText("", "action", "true", true)
    rabbitMQ.consumeText("action", 1, new java.util.function.Function[java.lang.String, java.lang.Boolean]() {

      override def apply(t: java.lang.String): java.lang.Boolean = {
        queue.offer(t)
        true
      }

    })

    var isExiting = false
    while (!isExiting) {
      val message = queue.take()
      if (message.eq("exit")) {
        isExiting = true
      } else {
        val dataSet = env.fromCollection((1 to 1000000).map(String.valueOf)).rebalance
        dataSet.output(new HadoopValueBatchSerializeOutputFormat[String]("/data/sink/dataset", 100))
        env.execute("daemon")
      }
    }
  }

}

object Stream {

  def main(ar: Array[String]): Unit = {

    import org.apache.flink.streaming.api.windowing.time.Time
    import collection.JavaConverters._

    val sEnv = StreamExecutionEnvironment.getExecutionEnvironment
    val dataStream = sEnv.addSource(new SourceFunction[String] {
      override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
        val rabbitMQ = RabbitMQAgent.builder().vHost("/app").host("master").port(5672).user("app").password("app").build
        rabbitMQ.declareQueue("action", true, false, false, null)
        rabbitMQ.consumeText("action", 1, new java.util.function.Function[java.lang.String, java.lang.Boolean]() {
          override def apply(t: java.lang.String): java.lang.Boolean = {
            sourceContext.collect(t)
            true
          }
        })
        Thread.sleep(100000000000L)
      }

      override def cancel(): Unit = {

      }
    })

    dataStream.flatMap(new FlatMapFunction[String, String] {
      override def flatMap(t: String, collector: Collector[String]): Unit = {
        t.toCharArray.map(String.valueOf).foreach(collector.collect)
      }
    }).timeWindowAll(Time.seconds(5)).apply(new AllWindowFunction[String, String, TimeWindow] {
      override def apply(window: TimeWindow, values: lang.Iterable[String], out: Collector[String]): Unit = {
        values.iterator.asScala.foreach(out.collect)
      }
    }).print()

    sEnv.execute("后台程序")
  }

}

